package com.spx.chapter05.sink;

import com.spx.chapter05.pojo.Event;
import com.spx.util.SampleDataUtil;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

/**
 * create by undeRdoG on  2022-05-01  15:44
 * 凡心所向，素履以往，生如逆旅，一苇以航。
 */
public class RedisSinkTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStreamSource<Event> dataSource = env.fromCollection(SampleDataUtil.getSample());


        // jedis 链接配置
        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                .setHost("localhost")
                .setPort(6379)
                .build();

        dataSource.addSink(new RedisSink<>(config, new MyRedisMapper()));

        env.execute();
    }


    // 自定义类实现redis接口
    public static class MyRedisMapper implements RedisMapper<Event>{

        @Override
        public RedisCommandDescription getCommandDescription() {
            // additionalKey 表示要写入的集合名称
            return new RedisCommandDescription(RedisCommand.HSET,"allows");
        }

        @Override
        public String getKeyFromData(Event data) {
            return data.user;
        }

        @Override
        public String getValueFromData(Event data) {
            return data.toString();
        }
    }
}
